Introduction

Overview

Plan

At the end of this session, you will have learned how to:

  • Take advantage of the verbs and syntax you learned from the dplyr module to manipulate RxXdfData data objects
  • Summarize your RxXdfData objects quickly and easily
  • Create custom functions and use them for mutations and summarizations
  • Understand where and when to use the dplyrXdf package and when to use functions from the RevoScaleR package

The Microsoft R Family

Microsoft R Family

Microsoft R Family

Microsoft R Component Stack

Microsoft R Family

Microsoft R Family

Why dplyrXdf?

Simplify Your Analysis Pipeline

  • The RevoScaleR package enables R users to manipulate data that is larger than memory
  • It introduces a new data type, called an xdf (short for eXternal Data Frame), which are highly efficient out-of-memory objects
  • However, many of the RevoScaleR functions have a dramatically different syntax from base R functions
  • The dplyr package is an exceptionally popular, due to its appealing syntax, and it’s extensibility

Simpler Analysis with dplyrXdf

  • The dplyrXdf that exposes most of the dplyr functionality to xdf objects
  • Many data analysis pipelines require creating many intermediate datasets, which are only needed for their role in deriving a final dataset, but have no/little use on their own
  • The dplyrXdf abstracts this task of file management, so that you can focus on the data itself, rather than the management of intermediate files
  • Unlike dplyr, or other base R packages, dplyrXdf allows you to work with data residing outside of memory, and therefore scales to datasets of arbitrary size

Requirements

What You’ll Need

  • I expect that you have already covered the dplyr training
  • Understand the XDF data type and how to import data to XDF
  • If you’re working on a different computer than your trianer: have (devtools)[github.com/hadley/devtools] (and if on a Windows machine, Rtools)

Installing dplyrXdf

  • The dplyrXdf package is not yet on CRAN
  • You have to download it from github
  • if you’re on a windows machine, install Rtools as well
  • the devtools package provides a very handy function, install_github, for installing R packages saved in github repositories

Create XDF from taxi data

Create a local directory to save XDF

your_name <- "alizaidi"
your_dir <- paste0('/datadrive/', your_name)
# File Path to your Data
your_data <- file.path(your_dir, 'tripdata_2015.xdf')
dir.create(your_dir)
## Warning in dir.create(your_dir): '/datadrive/alizaidi' already exists
download.file("http://alizaidi.blob.core.windows.net/training/yellow_tripdata_2015.xdf", 
              destfile = your_data)

Create a Pointer to XDF

library(dplyrXdf)
taxi_xdf <- RxXdfData(your_data)
taxi_xdf %>% head
##   VendorID passenger_count trip_distance RateCodeID store_and_fwd_flag
## 1        2               1          1.59          1                  N
## 2        1               1          3.30          1                  N
## 3        1               1          1.80          1                  N
## 4        1               1          0.50          1                  N
## 5        1               1          3.00          1                  N
## 6        1               1          9.00          1                  N
##   payment_type fare_amount tip_amount tolls_amount pickup_hour pickup_dow
## 1            1        12.0       3.25         0.00        6-10        Thu
## 2            1        14.5       2.00         0.00        6-10        Sat
## 3            2         9.5       0.00         0.00        6-10        Sat
## 4            2         3.5       0.00         0.00        6-10        Sat
## 5            2        15.0       0.00         0.00        6-10        Sat
## 6            1        27.0       6.70         5.33        6-10        Sat
##   dropoff_hour dropoff_dow        pickup_nhood    dropoff_nhood kSplits
## 1         6-10         Thu    Garment District      Murray Hill       D
## 2         6-10         Sat                Soho          Clinton       G
## 3         6-10         Sat Morningside Heights Hamilton Heights       A
## 4         6-10         Sat             Tribeca          Tribeca       B
## 5         6-10         Sat             Midtown          Chelsea       F
## 6         6-10         Sat                <NA>          Midtown       I
class(taxi_xdf)
## [1] "RxXdfData"
## attr(,"package")
## [1] "RevoScaleR"

Simplified Pipelines for Data Summaries

Data Transforms

The rxDataStep Way

  • All the functionality exposed by the dplyrXdf package can also be completed by using the rxDataStep function in the RevoScaleR package included with your MRS installation
  • In fact, dplyrXdf consists almost entirely of wrapper functions that call on other RevoScaleR functions
  • Let’s compare the workflow for adding a new column to a dataset with rxDataStep vs dplyrXdf

taxi_xdf %>% rxGetInfo(getVarInfo = TRUE)
## File name: /datadrive/alizaidi/tripdata_2015.xdf 
## Number of observations: 37696906 
## Number of variables: 16 
## Number of blocks: 21 
## Compression type: zlib 
## Variable information: 
## Var 1: VendorID
##        2 factor levels: 2 1
## Var 2: passenger_count, Type: integer, Low/High: (1, 9)
## Var 3: trip_distance, Type: numeric, Low/High: (0.0000, 49.9000)
## Var 4: RateCodeID
##        7 factor levels: 1 2 5 3 4 99 6
## Var 5: store_and_fwd_flag
##        2 factor levels: N Y
## Var 6: payment_type
##        5 factor levels: 1 2 3 4 5
## Var 7: fare_amount, Type: numeric, Low/High: (0.0100, 503325.5300)
## Var 8: tip_amount, Type: numeric, Low/High: (-0.0100, 3950588.8000)
## Var 9: tolls_amount, Type: numeric, Low/High: (0.0000, 1000.6600)
## Var 10: pickup_hour
##        6 factor levels: 5-9 9-12 12-4 4-6 6-10 10-5
## Var 11: pickup_dow
##        7 factor levels: Sun Mon Tue Wed Thu Fri Sat
## Var 12: dropoff_hour
##        6 factor levels: 5-9 9-12 12-4 4-6 6-10 10-5
## Var 13: dropoff_dow
##        7 factor levels: Sun Mon Tue Wed Thu Fri Sat
## Var 14: pickup_nhood
##        263 factor levels: 19th Ward Abbott McKinley Albright Allen Annandale ... Woodhaven-Richmond Hill Woodlawn-Nordwood Woodrow Woodside Yorkville
## Var 15: dropoff_nhood
##        263 factor levels: 19th Ward Abbott McKinley Albright Allen Annandale ... Woodhaven-Richmond Hill Woodlawn-Nordwood Woodrow Woodside Yorkville
## Var 16: kSplits
##        10 factor levels: A B C D E F G H I J

taxi_transform <- RxXdfData(your_data)

system.time(taxi_transform <- rxDataStep(inData = taxi_xdf,
           outFile = taxi_transform,
           transforms = list(tip_pct = tip_amount/fare_amount),
           overwrite = TRUE))
##    user  system elapsed 
##   1.502   2.070  41.808

Data Transforms

The rxDataStep Way

rxGetInfo(RxXdfData(taxi_transform), numRows = 2)
## File name: /datadrive/alizaidi/tripdata_2015.xdf 
## Number of observations: 37696906 
## Number of variables: 17 
## Number of blocks: 21 
## Compression type: zlib 
## Data (2 rows starting with row 1):
##   VendorID passenger_count trip_distance RateCodeID store_and_fwd_flag
## 1        2               1          1.59          1                  N
## 2        1               1          3.30          1                  N
##   payment_type fare_amount tip_amount tolls_amount pickup_hour pickup_dow
## 1            1        12.0       3.25            0        6-10        Thu
## 2            1        14.5       2.00            0        6-10        Sat
##   dropoff_hour dropoff_dow     pickup_nhood dropoff_nhood kSplits
## 1         6-10         Thu Garment District   Murray Hill       D
## 2         6-10         Sat             Soho       Clinton       G
##     tip_pct
## 1 0.2708333
## 2 0.1379310

Data Transforms

The dplyrXdf Way

  • We could do the same operation with dplyrXdf, using the exact same syntax that we learned in the dplyr module and taking advantage of the %>% operator
system.time(taxi_transform <- taxi_xdf %>% mutate(tip_pct = tip_amount/fare_amount))
##    user  system elapsed 
##   1.537   2.104  43.378
taxi_transform %>% rxGetInfo(numRows = 2)
## File name: /tmp/RtmphytGxr/file7373ff7d85e.xdf 
## Number of observations: 37696906 
## Number of variables: 17 
## Number of blocks: 21 
## Compression type: zlib 
## Data (2 rows starting with row 1):
##   VendorID passenger_count trip_distance RateCodeID store_and_fwd_flag
## 1        2               1          1.59          1                  N
## 2        1               1          3.30          1                  N
##   payment_type fare_amount tip_amount tolls_amount pickup_hour pickup_dow
## 1            1        12.0       3.25            0        6-10        Thu
## 2            1        14.5       2.00            0        6-10        Sat
##   dropoff_hour dropoff_dow     pickup_nhood dropoff_nhood kSplits
## 1         6-10         Thu Garment District   Murray Hill       D
## 2         6-10         Sat             Soho       Clinton       G
##     tip_pct
## 1 0.2708333
## 2 0.1379310

Differences

  • The major difference between the rxDataStep operation and the dplyrXdf method, is that we do not specify an outFile argument anywhere in the dplyrXdf pipeline
  • In our case, we have assigned our mutate value to a new variable called taxi_transform
  • This creates a temporary file to save the intermediate xdf, and only saves the most recent output of a pipeline, where a pipeline is defined as all operations starting from a raw xdf file.
  • To copy an xdf from the temporary directory to permanent storage, use the persist verb

taxi_transform@file
## [1] "/tmp/RtmphytGxr/file7373ff7d85e.xdf"
persist(taxi_transform, outFile = "taxiTransform.xdf") -> taxi_transform

Using dplyrXdf for Aggregations

dplyrXdf Way

  • The dplyrXdf package really shines when used for data aggregations and summarizations
  • Whereas rxSummary, rxCube, and rxCrossTabs can compute a few summary statistics and do aggregations very quickly, they are not sufficiently general to be used in all places

taxi_group <- taxi_transform %>%
  group_by(pickup_nhood) %>% 
  summarize(ave_tip_pct = mean(tip_pct))
taxi_group %>% head
##               pickup_nhood ave_tip_pct
## 1                Annandale  0.13750000
## 2            Ardon Heights  0.03814815
## 3 Astoria-Long Island City  0.12307670
## 4               Auburndale  0.14224544
## 5             Battery Park  0.15798303
## 6                Bay Ridge  0.40164238

Using dplyrXdf for Aggregations

rxCube Way

  • The above could have been done with rxCube as well, but would require additional considerations
  • We would have to make sure that the pickup_nhood column was a factor (can’t mutate in place because of different data types)
  • rxCube can only provide summations and averages, so we cannot get standard deviations for instance.
  • Creating your own factors is never a pleasant experience. You may feel like everything is going right until

faceplant

rxFactors(inData = taxi_transform, 
          outFile = "/datadrive/alizaidi/taxi_factor.xdf", 
          factorInfo = c("pickup_nhood"), 
          overwrite = TRUE)
## Warning in factorInfoVarList(factorInfo[i], varInfo, sortLevelsDefault = sortLevels, : 
##   No changes will be made to the factor variable 'pickup_nhood'
##   because 'sortLevels = FALSE' and there is no 'indexMap'.
## Warning in rxFactorsBase(inData = dataIO[["inData"]], factorInfo =
## factorInfo, : No changes made to the data set.
head(rxCube(tip_pct ~ pickup_nhood, 
            means = TRUE, 
            data = "/datadrive/alizaidi/taxi_factor.xdf"))
##      pickup_nhood tip_pct Counts
## 1       19th Ward     NaN      0
## 2 Abbott McKinley     NaN      0
## 3        Albright     NaN      0
## 4           Allen     NaN      0
## 5       Annandale  0.1375      2
## 6      Arbor Hill     NaN      0
# file.remove("data/taxi_factor.xdf")

Creating Functional Pipelines with dplyrXdf

As we saw above, it’s pretty easy to create a summarization or aggregation script. We can encapsulate our aggregation into it’s own function. Suppose we wanted to calculate average tip as a function of dropoff and pickup neighborhoods. In the dplyr nonmenclature, this means grouping by dropoff and pickup neighborhoods, and summarizing/averaging tip percent.

rxGetInfo(taxi_transform, numRows = 5)
## File name: /home/alizaidi/mr4ds/Student-Resources/rmarkdown/taxiTransform.xdf 
## Number of observations: 37696906 
## Number of variables: 17 
## Number of blocks: 76 
## Compression type: zlib 
## Data (5 rows starting with row 1):
##   VendorID passenger_count trip_distance RateCodeID store_and_fwd_flag
## 1        2               1          1.59          1                  N
## 2        1               1          3.30          1                  N
## 3        1               1          1.80          1                  N
## 4        1               1          0.50          1                  N
## 5        1               1          3.00          1                  N
##   payment_type fare_amount tip_amount tolls_amount pickup_hour pickup_dow
## 1            1        12.0       3.25            0        6-10        Thu
## 2            1        14.5       2.00            0        6-10        Sat
## 3            2         9.5       0.00            0        6-10        Sat
## 4            2         3.5       0.00            0        6-10        Sat
## 5            2        15.0       0.00            0        6-10        Sat
##   dropoff_hour dropoff_dow        pickup_nhood    dropoff_nhood kSplits
## 1         6-10         Thu    Garment District      Murray Hill       D
## 2         6-10         Sat                Soho          Clinton       G
## 3         6-10         Sat Morningside Heights Hamilton Heights       A
## 4         6-10         Sat             Tribeca          Tribeca       B
## 5         6-10         Sat             Midtown          Chelsea       F
##     tip_pct
## 1 0.2708333
## 2 0.1379310
## 3 0.0000000
## 4 0.0000000
## 5 0.0000000

mht_url <- "http://alizaidi.blob.core.windows.net/training/manhattan.rds"
manhattan_hoods <- readRDS(gzcon(url(mht_url)))

taxi_transform %>% 
    filter(pickup_nhood %in% mht_hoods,
           dropoff_nhood %in% mht_hoods, 
           .rxArgs = list(transformObjects = list(mht_hoods = manhattan_hoods))) %>% 
    group_by(dropoff_nhood, pickup_nhood) %>% 
    summarize(ave_tip = mean(tip_pct), 
              ave_dist = mean(trip_distance)) %>% 
    filter(ave_dist > 3, ave_tip > 0.05) -> sum_df

sum_df %>% rxGetInfo(getVarInfo = TRUE, numRows = 5)
## File name: /tmp/RtmphytGxr/file73737ad66f7d.xdf 
## Number of observations: 408 
## Number of variables: 4 
## Number of blocks: 1 
## Compression type: zlib 
## Variable information: 
## Var 1: dropoff_nhood
##        263 factor levels: 19th Ward Abbott McKinley Albright Allen Annandale ... Woodhaven-Richmond Hill Woodlawn-Nordwood Woodrow Woodside Yorkville
## Var 2: pickup_nhood
##        263 factor levels: 19th Ward Abbott McKinley Albright Allen Annandale ... Woodhaven-Richmond Hill Woodlawn-Nordwood Woodrow Woodside Yorkville
## Var 3: ave_tip, Type: numeric, Low/High: (0.0579, 0.1876)
## Var 4: ave_dist, Type: numeric, Low/High: (3.0004, 13.9100)
## Data (5 rows starting with row 1):
##      dropoff_nhood pickup_nhood   ave_tip ave_dist
## 1     Central Park Battery Park 0.1160478 6.028228
## 2          Clinton Battery Park 0.1300344 3.878010
## 3      East Harlem Battery Park 0.1102473 9.982296
## 4     East Village Battery Park 0.1401632 3.952964
## 5 Garment District Battery Park 0.1312443 3.874708
class(sum_df)
## [1] "grouped_tbl_xdf"
## attr(,"package")
## [1] "dplyrXdf"

Alternatively, we can encapsulate this script into a function, so that we can easily call it in a functional pipeline.

taxi_hood_sum <- function(taxi_data = taxi_df, ...) {
  
  taxi_data %>% 
    filter(pickup_nhood %in% manhattan_hoods,
           dropoff_nhood %in% manhattan_hoods, ...) %>% 
    group_by(dropoff_nhood, pickup_nhood) %>% 
    summarize(ave_tip = mean(tip_pct), 
              ave_dist = mean(trip_distance)) %>% 
    filter(ave_dist > 3, ave_tip > 0.05) -> sum_df
  
  return(sum_df)
  
}

The resulting summary object isn’t very large (about 408 rows in this case), so it shouldn’t cause any memory overhead issues if we covert it now to a data.frame. We can plot our results using our favorite plotting library.

tile_plot_hood <- function(df = taxi_hood_sum()) {
  
  library(ggplot2)
  
  ggplot(data = df, aes(x = pickup_nhood, y = dropoff_nhood)) + 
    geom_tile(aes(fill = ave_tip), colour = "white") + 
    theme_bw() + 
    theme(axis.text.x = element_text(angle = 45, hjust = 1),
          legend.position = 'bottom') + 
    scale_fill_gradient(low = "white", high = "steelblue") -> gplot
  
  return(gplot)
}

# tile_plot_hood(as.data.frame(sum_df))
taxi_transform <- taxi_xdf %>% mutate(tip_pct = tip_amount/fare_amount)
library(plotly)
sum_df <- taxi_hood_sum(taxi_transform, 
                        .rxArgs = list(transformObjects = list(manhattan_hoods = manhattan_hoods))) %>% 
  persist("/datadrive/alizaidi/summarized.xdf")
ggplotly(tile_plot_hood(as.data.frame(sum_df)))

Split and Combining Operations with doXdf

Custom functions across groups

The do verb is an exception to the rule that dplyrXdf verbs write their output as xdf files. This is because do executes arbitrary R code, and can return arbitrary R objects; while a data frame is capable of storing these objects, an xdf file is limited to character and numeric vectors only.

Custom functions across groups

The doXdf verb is similar to do, but where do splits its input into one data frame per group, doXdf splits it into one xdf file per group. This allows do-like functionality with grouped data, where each group can be arbitrarily large. The syntax for the two functions is essentially the same, although the code passed to doXdf must obviously know how to handle xdfs.


taxi_models <- taxi_xdf %>% group_by(pickup_dow) %>% doXdf(model = rxLinMod(tip_amount ~ fare_amount, data = .))
taxi_models
## Source: local data frame [7 x 2]
## Groups: <by row>
## 
## # A tibble: 7 × 2
##   pickup_dow          model
## *     <fctr>         <list>
## 1        Fri <S3: rxLinMod>
## 2        Mon <S3: rxLinMod>
## 3        Sat <S3: rxLinMod>
## 4        Sun <S3: rxLinMod>
## 5        Thu <S3: rxLinMod>
## 6        Tue <S3: rxLinMod>
## 7        Wed <S3: rxLinMod>
taxi_models$model[[1]]
## Call:
## rxLinMod(formula = tip_amount ~ fare_amount, data = .)
## 
## Linear Regression Results for: tip_amount ~ fare_amount
## Data: . (RxXdfData Data Source)
## File name: /tmp/RtmphytGxr/file73731f59af6.pickup_dow.Fri.xdf
## Dependent variable(s): tip_amount
## Total independent variables: 2 
## Number of valid observations: 5885510
## Number of missing observations: 0 
##  
## Coefficients:
##               tip_amount
## (Intercept) 1.6567742764
## fare_amount 0.0007725297

Memory Issues

All the caveats that go with working with data.frames apply here. While each grouped partition is it’s own RxXdfData object, the return value must be a data.frame, and hence, must fit in memory. Moreover, the function you apply against the splits will determine how they are operated. If you use an rx function, you’ll get the nice fault-tolerant, parallel execution strategies the RevoScaleR package provides, but for any vanilla/CRAN function will work with data.frames and can easily cause your session to crash.


library(broom)
taxi_broom <- taxi_xdf %>% group_by(pickup_dow) %>% doXdf(model = lm(tip_amount ~ fare_amount, data = .))

Now we can apply the broom::tidy function at the row level to get summary statistics:

library(broom)
tbl_df(taxi_broom) %>% tidy(model)
## Source: local data frame [14 x 6]
## Groups: pickup_dow [7]
## 
##    pickup_dow        term     estimate    std.error    statistic
##        <fctr>       <chr>        <dbl>        <dbl>        <dbl>
## 1         Sun (Intercept) 2.3135094526 7.571328e-01    3.0556190
## 2         Sun fare_amount 0.0004142143 3.428908e-03    0.1208006
## 3         Mon (Intercept) 0.0874830714 1.422496e-03   61.4996928
## 4         Mon fare_amount 0.1273401404 8.615129e-05 1478.0991027
## 5         Tue (Intercept) 1.6038100602 1.087692e-03 1474.5078486
## 6         Tue fare_amount 0.0071964814 2.400197e-05  299.8287270
## 7         Wed (Intercept) 0.0115389375 1.377781e-03    8.3750150
## 8         Wed fare_amount 0.1386170412 8.666624e-05 1599.4352913
## 9         Thu (Intercept) 0.2319764306 1.340523e-03  173.0491740
## 10        Thu fare_amount 0.1180815950 8.138576e-05 1450.8876936
## 11        Fri (Intercept) 1.6567742764 1.035926e-03 1599.3176836
## 12        Fri fare_amount 0.0007725297 7.730892e-06   99.9276322
## 13        Sat (Intercept) 1.4252806941 8.801076e-04 1619.4390909
## 14        Sat fare_amount 0.0025861524 1.405561e-05  183.9943833
## # ... with 1 more variables: p.value <dbl>